# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.


import ConfigParser
import glob
import os
import re


CONFIG_FILE_NAME = '.expect_tests.cfg'


def get_python_root(path):
  """Get the lowest directory with no __init__.py file.

  When ``path`` is pointing inside a Python package, this function returns the
  directory directly containing this package. If ``path`` points outside of
  a Python package, the it returns ``path``.

  Args:
    path (str): arbitrary path
  Returns:
    root (str): ancestor directory, with no __init__.py file in it.
  """
  if not os.path.exists(path):
    raise ValueError('path must exist: %s')

  while path != os.path.dirname(path):
    if not os.path.exists(os.path.join(path, '__init__.py')):
      return path
    path = os.path.dirname(path)

  # This is not supposed to happen, but in case somebody adds a __init__.py
  # at the filesystem root ...
  raise IOError("Unable to find a python root for %s" % path)


def parse_test_glob(test_glob):
  """A test glob is composed of a path and a glob expression like:
    '<path>:<glob>'. The path should point to a directory or a file inside
    a Python package (it can be the root directory of that package).
    The glob is a Python name used to filter tests.

    A test name is the name of the method prepended with the class name.

    Example:
    'my/nice/package/test1/:TestA*', the package root being 'my/nice/package':
    this matches all tests whose class name starts with 'TestA' inside all
    files matching test1/*_test.py, like
    ``TestABunchOfStuff.testCatFoodDeployment``.

    Args:
       test_glob (str): a test glob
    Returns:
       (path, test_filter): absolute path and test filter glob.
  """
  parts = test_glob.split(':')
  if len(parts) > 2:
    raise ValueError('A test_glob should contain at most one colon (got %s)'
                     % test_glob)
  if len(parts) == 2:
    path, test_filter = parts
    if '/' in test_filter:
      raise ValueError('A test filter cannot contain a slash (got %s)',
                       test_filter)

    if not test_filter:  # empty string case
      test_filter = ('*',)
    else:
      test_filter = (test_filter,)
  else:
    path, test_filter = parts[0], ('*',)

  path = os.path.abspath(path)
  return path, test_filter


class PackageTestingContext(object):
  def __init__(self, cwd, package_name, filters):
    """Information to run a set of tests in a single package.

    See also parse_test_glob.
    """
    # TODO(iannucci): let's scan packages too so that <path> can also be a
    # glob. Then expect_tests can use a default of '*:*' when no tests are
    # specified.

    self.cwd = cwd
    self.package_name = package_name
    # list of (path, filter) pairs.
    # The path is a relative path to a subdirectory of
    #   os.path.join(self.cwd, self.package_name) in which to look for tests.
    # Only tests whose name matches the glob are kept.
    self.filters = filters

  def itermatchers(self):
    """Iterate over all filters, and yield matchers for each of them.

    Yields:
      path (str): restrict test listing to this subpackage.
      matcher (SRE_Pattern): whitelist matcher
    """
    for filt in self.filters:
      # Implicitely append * to globs
      one_glob = '%s%s' % (filt[1], '*' if '*' not in filt[1] else '')
      matcher = re.compile('^(?:%s)$' % glob.fnmatch.translate(one_glob))

      if matcher.pattern == '^$':
        matcher = re.compile('^.*$')

      yield filt[0], matcher

  @classmethod
  def from_path(cls, path, filters=('*',)):
    path = os.path.abspath(path)
    if not os.path.exists(path):
      raise ValueError('Path does not exist: %s' % path)
    cwd = get_python_root(path)
    package_name = os.path.relpath(path, cwd).split(os.path.sep)[0]
    # The path in which to look for tests. Only tests whose name matches the
    # glob are kept.
    relpath = os.path.relpath(path, os.path.join(cwd, package_name))

    if not isinstance(filters, (list, tuple)):
      raise ValueError('the "filter" parameter must be a tuple or a list, '
                       'got %s' % type(filters).__name__)
    if len(filters) == 0:
      filters = [(relpath, '*')]
    else:
      filters = [(relpath, filt) for filt in filters]

    return cls(cwd, package_name, filters)

  @classmethod
  def from_context_list(cls, contexts):
    """Merge several PackageTestingContext pointing to the same package."""
    cwd = set(context.cwd for context in contexts)
    if len(cwd) > 1:
      raise ValueError(
        'from_context_list() was given'
        'process contexts containing the following cwds, '
        'but can only process contexts which all share a single cwd: '
        '%s' % str(cwd))

    package_name = set(context.package_name for context in contexts)
    if len(package_name) > 1:
      raise ValueError(
        'from_context_list() was given'
        'process contexts containing the following package_name, '
        'but can only process contexts which all share a single package_name: '
        '%s' % str(package_name))

    filters = []
    for context in contexts:
      filters.extend(context.filters)

    return cls(cwd.pop(), package_name.pop(), filters)


class ProcessingContext(object):
  def __init__(self, testing_contexts):
    """Information to run a set of tasks in a given working directory.

    Args:
      testing_contexts (list): list of PackageTestingContext instances.
    """
    self.cwd = testing_contexts[0].cwd

    # Merge testing_contexts by package
    groups = {}
    for context in testing_contexts:
      if context.cwd != self.cwd:
        raise ValueError('All package must have the same value for "cwd"')
      groups.setdefault(context.package_name, []).append(context)

    self.testing_contexts = [PackageTestingContext.from_context_list(contexts)
                             for contexts in groups.itervalues()]


def get_config(path):
  """Get configuration values

  Reads the config file in provided path, and returns content.
  See Python ConfigParser for general formatting syntax.

  Example:
  [expect_tests]
  skip=directory1
     directory2
     directory3

  Args:
  path (str): path to a directory.

  Returns:
  black_list (set): blacklisted subdirectories.
  """
  black_list = set()

  config_file_name = os.path.join(path, CONFIG_FILE_NAME)
  parser = ConfigParser.ConfigParser()
  parser.read([config_file_name])

  if not parser.has_section('expect_tests'):
    return black_list

  if parser.has_option('expect_tests', 'skip'):
    black_list.update(parser.get('expect_tests', 'skip').splitlines())
  return black_list


def get_runtime_contexts(test_globs):
  """Compute the list of packages/filters to get tests from."""
  # Step 1: compute list of packages + subtree
  testing_contexts = []
  for test_glob in test_globs:
    path, test_filter = parse_test_glob(test_glob)

    if os.path.exists(os.path.join(path, '__init__.py')):
      testing_contexts.append(
        PackageTestingContext.from_path(path, test_filter))
    else:
      # Look for all packages in path.
      subpaths = []
      black_list = get_config(path)

      for filename in filter(lambda x: x not in black_list, os.listdir(path)):
        abs_filename = os.path.join(path, filename)
        if (os.path.isdir(abs_filename)
            and os.path.isfile(os.path.join(abs_filename, '__init__.py'))):
          subpaths.append(abs_filename)

      testing_contexts.extend(
        [PackageTestingContext.from_path(subpath, test_filter)
         for subpath in subpaths])

  # Step 2: group by working directory - one process per wd.
  groups = {}
  for context in testing_contexts:
    groups.setdefault(context.cwd, []).append(context)
  return [ProcessingContext(contexts) for contexts in groups.itervalues()]
